Skip to content

fix(agent-runtime): consolidate streaming text content blocks for storage#425

Open
jerryliang64 wants to merge 1 commit intomasterfrom
fix/consolidate-stream-content-blocks
Open

fix(agent-runtime): consolidate streaming text content blocks for storage#425
jerryliang64 wants to merge 1 commit intomasterfrom
fix/consolidate-stream-content-blocks

Conversation

@jerryliang64
Copy link
Copy Markdown
Contributor

@jerryliang64 jerryliang64 commented Mar 31, 2026

Summary

  • When an app yields many small streaming chunks via execRun, each chunk was stored as a separate text content block in the thread message
  • GET /threads/:id returned fragmented content (dozens of tiny text blocks instead of one consolidated block)
  • Added MessageConverter.consolidateContentBlocks() which merges adjacent text blocks into a single block
  • Applied in both streamRun (via consumeStreamMessages) and syncRun/asyncRun (via extractFromStreamMessages) paths
  • SSE streaming deltas remain granular — only stored messages are consolidated
  • This matches the OpenAI Assistants API behavior

Test plan

  • consolidateContentBlocks: 6 unit tests (merge, annotations, empty, single, many, no-mutation)
  • extractFromStreamMessages: updated existing tests + added streaming consolidation test
  • syncRun: integration test verifying chunks consolidated into single message in run output and thread history
  • asyncRun: integration test verifying chunks consolidated into single message in run output and thread history
  • streamRun: integration test verifying SSE deltas remain granular (3 events) while stored message has consolidated content (1 block)
  • All 114 tests passing

🤖 Generated with Claude Code

Summary by CodeRabbit

  • Bug Fixes
    • Streamed assistant text fragments are now merged into single consolidated message blocks; adjacent text annotations are combined and persisted as one final message.
    • Thread history and saved run outputs store the consolidated content (reducing fragmented entries), while live SSE/delta events remain granular for real-time updates.
  • Tests
    • Added tests validating consolidation behavior, persisted outputs, and that streaming events stay fine-grained.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 31, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 61b303f5-c16d-4110-9bba-431befc373c5

📥 Commits

Reviewing files that changed from the base of the PR and between fdd644a and 4493993.

📒 Files selected for processing (4)
  • core/agent-runtime/src/AgentRuntime.ts
  • core/agent-runtime/src/MessageConverter.ts
  • core/agent-runtime/test/AgentRuntime.test.ts
  • core/agent-runtime/test/MessageConverter.test.ts
✅ Files skipped from review due to trivial changes (1)
  • core/agent-runtime/src/AgentRuntime.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • core/agent-runtime/test/MessageConverter.test.ts

📝 Walkthrough

Walkthrough

AgentRuntime and MessageConverter now consolidate streamed message content: streamed MessageContentBlock arrays are accumulated, adjacent text blocks are merged (texts concatenated, annotations combined), and consumers receive a single consolidated content array instead of fragmented blocks.

Changes

Cohort / File(s) Summary
Core consolidation logic
core/agent-runtime/src/MessageConverter.ts
Added consolidateContentBlocks(blocks: MessageContentBlock[]) to merge adjacent ContentBlockType.Text blocks (concatenate text.value, merge text.annotations). extractFromStreamMessages now accumulates all blocks across stream messages and returns at most one consolidated ThreadMessage (or []).
Runtime integration
core/agent-runtime/src/AgentRuntime.ts
consumeStreamMessages now post-processes accumulated content via MessageConverter.consolidateContentBlocks(content) before returning { content, usage, aborted }.
Tests
core/agent-runtime/test/AgentRuntime.test.ts, core/agent-runtime/test/MessageConverter.test.ts
Added tests for consolidateContentBlocks; updated stream extraction tests to expect one consolidated message; added sync/async/stream run tests verifying SSE deltas remain per-chunk while persisted/completed messages are consolidated; immutability and non-text-boundary cases covered.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant AgentRuntime
    participant MessageConverter
    participant Storage
    Client->>AgentRuntime: send stream of AgentStreamMessage chunks
    AgentRuntime->>AgentRuntime: accumulate incoming messages & usage
    AgentRuntime->>MessageConverter: extractFromStreamMessages(accumulated)
    MessageConverter->>MessageConverter: gather all content blocks
    MessageConverter->>MessageConverter: consolidateContentBlocks(blocks)
    MessageConverter-->>AgentRuntime: return single consolidated ThreadMessage (or none)
    AgentRuntime->>Storage: persist ThreadMessageCompleted / update thread history
    AgentRuntime-->>Client: emit SSE deltas (per-chunk) and final consolidated completion
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

I nibble chunks and stitch them tight,
From scattered whispers I make one light.
Hello, world—no gaps between,
One smooth message, neat and clean. 🐇✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly describes the main change: consolidating streaming text content blocks before storage to fix fragmented content issues.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/consolidate-stream-content-blocks

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces logic to consolidate adjacent text content blocks into a single block, matching the OpenAI Assistants API behavior for streaming chunks. The implementation adds a consolidateContentBlocks utility to MessageConverter and integrates it into AgentRuntime. Feedback indicates that the merging logic currently fails to adjust annotation offsets for the combined text and lacks proper type checking for non-text blocks, which could lead to incorrect data or runtime crashes.

Comment on lines +54 to +58
if (prev && prev.type === ContentBlockType.Text && block.type === ContentBlockType.Text) {
prev.text = {
value: prev.text.value + block.text.value,
annotations: [...prev.text.annotations, ...block.text.annotations],
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When merging adjacent text blocks, the annotations from the subsequent block are appended without adjusting their character offsets. Since these indices (e.g., start, end, or start_index) are relative to the beginning of the text in their original block, they will point to incorrect positions in the newly consolidated text string. You should iterate through the annotations of the subsequent block and increment their indices by the length of the existing text in prev to ensure they remain valid.

Comment on lines +60 to +63
result.push({
...block,
text: { ...block.text, annotations: [...block.text.annotations] },
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The else block assumes that every MessageContentBlock has a text property. While MessageContentBlock is currently only defined as TextContentBlock, the comments and the logic in this function suggest it is intended to handle other types (like images) as natural boundaries. If a non-text block is encountered, accessing block.text.annotations will cause a runtime crash. You should check the block type before attempting to deep-copy the text and annotations properties.

        result.push({
          ...block,
          ...(block.type === ContentBlockType.Text ? {
            text: { ...block.text, annotations: [...block.text.annotations] },
          } : {}),
        });

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@core/agent-runtime/src/MessageConverter.ts`:
- Around line 46-67: The consolidateContentBlocks function merges adjacent text
blocks but fails to shift the position-based annotation indices from the
concatenated block; update MessageConverter.consolidateContentBlocks so when
merging two ContentBlockType.Text blocks you compute an offset =
prev.text.value.length and map each annotation from the current block to a new
annotation with start+offset and end+offset (while copying other annotation
fields), then append those shifted annotations to prev.text.annotations; ensure
you never mutate original blocks by creating new objects for prev and for pushed
blocks and deep-copy annotation arrays when creating the non-merged branch.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b5cd8952-699f-4d29-bb27-fef37bc942c1

📥 Commits

Reviewing files that changed from the base of the PR and between 0c77cd1 and ccbc4d1.

📒 Files selected for processing (4)
  • core/agent-runtime/src/AgentRuntime.ts
  • core/agent-runtime/src/MessageConverter.ts
  • core/agent-runtime/test/AgentRuntime.test.ts
  • core/agent-runtime/test/MessageConverter.test.ts

Comment on lines +46 to +67
/**
* Merge adjacent text content blocks into a single block.
* Non-text blocks act as natural boundaries and are never merged.
*/
static consolidateContentBlocks(blocks: MessageContentBlock[]): MessageContentBlock[] {
const result: MessageContentBlock[] = [];
for (const block of blocks) {
const prev = result[result.length - 1];
if (prev && prev.type === ContentBlockType.Text && block.type === ContentBlockType.Text) {
prev.text = {
value: prev.text.value + block.text.value,
annotations: [...prev.text.annotations, ...block.text.annotations],
};
} else {
result.push({
...block,
text: { ...block.text, annotations: [...block.text.annotations] },
});
}
}
return result;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Annotation position indices are not adjusted during text concatenation.

The consolidation logic concatenates text.value strings and merges annotations arrays, but annotations contain position-based indices ({ start, end }) that reference character offsets within their original text block. After merging 'a' + 'b' into 'ab', the second block's annotation { start: 0, end: 1 } still points to offset 0, but it should now point to offset 1 (the position of 'b' in the merged string).

This will cause any downstream consumers that rely on annotation positions (e.g., for highlighting, links, or citations) to reference incorrect character ranges in the consolidated text.

Please verify whether annotation indices are consumed downstream and need adjustment:

#!/bin/bash
# Search for usages of annotations in the codebase
rg -n -C3 'annotations' --type=ts -g '!**/test/**' -g '!**/node_modules/**'
How does the OpenAI Assistants API handle annotation indices when streaming text deltas are consolidated into a final message?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/agent-runtime/src/MessageConverter.ts` around lines 46 - 67, The
consolidateContentBlocks function merges adjacent text blocks but fails to shift
the position-based annotation indices from the concatenated block; update
MessageConverter.consolidateContentBlocks so when merging two
ContentBlockType.Text blocks you compute an offset = prev.text.value.length and
map each annotation from the current block to a new annotation with start+offset
and end+offset (while copying other annotation fields), then append those
shifted annotations to prev.text.annotations; ensure you never mutate original
blocks by creating new objects for prev and for pushed blocks and deep-copy
annotation arrays when creating the non-merged branch.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
core/agent-runtime/src/MessageConverter.ts (1)

59-64: Defensive guard for non-text blocks would improve robustness.

The doc comment states non-text blocks act as boundaries, but the else branch unconditionally accesses block.text. Currently safe since MessageContentBlock = TextContentBlock, but if other content types (Image, File, etc.) are added later, this will throw.

🛡️ Proposed defensive handling
       } else {
-        result.push({
-          ...block,
-          text: { ...block.text, annotations: [ ...block.text.annotations ] },
-        });
+        if (block.type === ContentBlockType.Text) {
+          result.push({
+            ...block,
+            text: { ...block.text, annotations: [ ...block.text.annotations ] },
+          });
+        } else {
+          // Non-text blocks are preserved as-is
+          result.push({ ...block });
+        }
       }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/agent-runtime/src/MessageConverter.ts` around lines 59 - 64, The else
branch in MessageConverter where you push { ...block, text: { ...block.text,
annotations: [...] } } assumes block.text exists; add a defensive guard to
handle non-text MessageContentBlock variants by checking block.type or the
presence of block.text before accessing it. If block.text is present
(TextContentBlock), keep the existing cloning logic (preserve annotations),
otherwise push a shallow clone of block without touching text (or include a safe
default like text: undefined) so non-text blocks (e.g., Image/File) act as
boundaries without throwing; update the logic around result.push(...) in the
MessageConverter handling to reflect this guard.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@core/agent-runtime/src/MessageConverter.ts`:
- Around line 59-64: The else branch in MessageConverter where you push {
...block, text: { ...block.text, annotations: [...] } } assumes block.text
exists; add a defensive guard to handle non-text MessageContentBlock variants by
checking block.type or the presence of block.text before accessing it. If
block.text is present (TextContentBlock), keep the existing cloning logic
(preserve annotations), otherwise push a shallow clone of block without touching
text (or include a safe default like text: undefined) so non-text blocks (e.g.,
Image/File) act as boundaries without throwing; update the logic around
result.push(...) in the MessageConverter handling to reflect this guard.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ed898a2b-a45b-421d-9c09-a4b8ceaae590

📥 Commits

Reviewing files that changed from the base of the PR and between ccbc4d1 and fdd644a.

📒 Files selected for processing (1)
  • core/agent-runtime/src/MessageConverter.ts

Copy link
Copy Markdown
Contributor

@akitaSummer akitaSummer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1

…rage

When an app yields many small streaming chunks via execRun, each chunk
was stored as a separate text content block in the thread message. This
caused GET /threads/:id to return fragmented content (dozens of tiny
text blocks instead of one consolidated block).

This fix adds MessageConverter.consolidateContentBlocks() which merges
adjacent text blocks into a single block. It is applied in:

- consumeStreamMessages (streamRun path): consolidates before storing
  the completed message, while SSE deltas remain granular
- extractFromStreamMessages (syncRun/asyncRun path): consolidates all
  chunks into a single MessageObject instead of creating one per yield

This matches the OpenAI Assistants API behavior where streaming deltas
are fine-grained but stored messages have consolidated content.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jerryliang64 jerryliang64 force-pushed the fix/consolidate-stream-content-blocks branch from fdd644a to 4493993 Compare April 1, 2026 07:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants